Amazon Redshift [アップデート] データレイクで用いるカラム名ありパーティション形式のアンロードをサポートしたので試してみた
Amazon Redshiftは、クラスタバージョン1.0.14104からINCLUDEオプションとPARTITION BYを指定して、アンロードされたファイルにパーティション列を追加できるようになりました。個人的には待望の機能なので早速試してみました。
INCLUDEオプションとPARTITION BYの指定とは
INCLUDEオプションとPARTITION BYを指定して、アンロードされたファイルにパーティション列を追加できるようになりました。つまり、UNLOADコマンドを実行する際にPARTITION BYを指定することでデータレイクで用いるカラム名ありパーティション形式でアンロードできるようになりました。もちろん、出力ファイル形式はCSV、Parquetなどお好きなフォーマットで出力可能です。
PARTITION BY ( column_name [, ... ] ) [INCLUDE]
※ 執筆時点では、 上記のオプションは英語マニュアルにのみ記載されています。
PARTITION BY
にカラム指定するとカラムの値ごとにパーティション出力されます。また、INCLUDE
オプションでPARTITION BY
を指定した場合、パーティションカラムはアンロードされたファイルから削除されません。
unload ('select * from lineitem') to 's3://mybucket/lineitem/' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole' PARQUET PARTITION BY (l_shipdate);
4 つのスライスがある場合、生成される Parquet ファイルはさまざまなフォルダに動的に分割されます。
s3://mybucket/lineitem/l_shipdate=1992-01-02/0000_part_00.parquet 0001_part_00.parquet 0002_part_00.parquet 0003_part_00.parquet s3://mybucket/lineitem/l_shipdate=1992-01-03/0000_part_00.parquet 0001_part_00.parquet 0002_part_00.parquet 0003_part_00.parquet s3://mybucket/lineitem/l_shipdate=1992-01-04/0000_part_00.parquet 0001_part_00.parquet 0002_part_00.parquet 0003_part_00.parquet ...
検証用テーブルとデータ
検証用に用意したのテーブルのlo_orderdateカラムには、19920101〜19921231まで365日分のデータが含まれています。
cmdb=> \d lineorder_test Table "cm_ishikawa_satoru.lineorder_test" Column | Type | Collation | Nullable | Default --------------------+-----------------------+-----------+----------+--------- lo_orderkey | integer | | not null | lo_linenumber | integer | | not null | lo_custkey | integer | | not null | lo_partkey | integer | | not null | lo_suppkey | integer | | not null | lo_orderdate | integer | | not null | lo_orderpriority | character varying(15) | | not null | lo_shippriority | character varying(1) | | not null | lo_quantity | integer | | not null | lo_extendedprice | integer | | not null | lo_ordertotalprice | integer | | not null | lo_discount | integer | | not null | lo_revenue | integer | | not null | lo_supplycost | integer | | not null | lo_tax | integer | | not null | lo_commitdate | integer | | not null | lo_shipmode | character varying(10) | | not null | cmdb=> select count(*) from lineorder_test; count --------- 9106395 (1 row) cmdb=> \x Expanded display is on. cmdb=> select * from lineorder_test limit 1; -[ RECORD 1 ]------+---------- lo_orderkey | 521799110 lo_linenumber | 6 lo_custkey | 739298 lo_partkey | 783047 lo_suppkey | 758132 lo_orderdate | 19920102 lo_orderpriority | 3-MEDIUM lo_shippriority | 0 lo_quantity | 1 lo_extendedprice | 113001 lo_ordertotalprice | 26969219 lo_discount | 2 lo_revenue | 110740 lo_supplycost | 67800 lo_tax | 7 lo_commitdate | 19920317 lo_shipmode | SHIP
以降では、クラスタバージョン1.0.14104、dc2.largeの2ノードで機能の確認とともにUNLOAD時間も計測します。
UNLOAD(CSV出力)
通常のCSV出力は、約25秒でした。dc2.largeの2ノードなのでファイルは4つずつ出力します。
cmdb=> UNLOAD ('select * FROM lineorder_test;') cmdb-> TO 's3://cm-user/lineorder_test/lineorder_tsv/' cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role' cmdb-> GZIP cmdb-> DELIMITER '\t' cmdb-> HEADER cmdb-> ; INFO: UNLOAD completed, 9106395 record(s) unloaded successfully. UNLOAD Time: 21955.632 ms (00:21.956) --- % aws s3 ls s3://cm-user/lineorder_test/lineorder_tsv/ --recursive 2020-03-29 23:16:55 97117240 lineorder_test/lineorder_tsv/0000_part_00.gz 2020-03-29 23:16:54 97531482 lineorder_test/lineorder_tsv/0001_part_00.gz 2020-03-29 23:16:55 97121201 lineorder_test/lineorder_tsv/0002_part_00.gz 2020-03-29 23:16:55 97311619 lineorder_test/lineorder_tsv/0003_part_00.gz
上記にPARTITION BYの指定してカラム名ありパーティション出力すると約2分10秒で365パーティション作成されました。パーティションごとに4つのファイルが作成されます。先頭の0000_part_00.gz〜0003_part_00.gz(L.15~18)の4つのファイルも生成されますのでご注意ください。
cmdb=> UNLOAD ('select * FROM lineorder_test;') cmdb-> TO 's3://cm-user/lineorder_test/lineorder_partitioned_tsv/' cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role' cmdb-> GZIP cmdb-> DELIMITER '\t' cmdb-> HEADER cmdb-> PARTITION BY (lo_orderdate); INFO: UNLOAD completed, 9106395 record(s) unloaded successfully. UNLOAD Time: 130235.088 ms (02:10.235) --- % aws s3 ls s3://cm-user/lineorder_test/lineorder_partitioned_tsv/ --recursive 2020-03-29 23:17:35 136 lineorder_test/lineorder_partitioned_tsv/0000_part_00.gz 2020-03-29 23:17:35 136 lineorder_test/lineorder_partitioned_tsv/0001_part_00.gz 2020-03-29 23:17:35 136 lineorder_test/lineorder_partitioned_tsv/0002_part_00.gz 2020-03-29 23:17:35 136 lineorder_test/lineorder_partitioned_tsv/0003_part_00.gz 2020-03-29 23:17:35 251358 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19920102/0000_part_00.gz 2020-03-29 23:17:35 261823 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19920102/0001_part_00.gz 2020-03-29 23:17:35 256716 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19920102/0002_part_00.gz 2020-03-29 23:17:35 256881 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19920102/0003_part_00.gz : : 2020-03-29 23:19:44 262736 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19921231/0002_part_00.gz 2020-03-29 23:19:44 257813 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19921231/0003_part_00.gz
UNLOAD(PARTITION BYによるParquet出力)
通常のCSV出力は、約20秒で最速でした。dc2.largeの2ノードなのでファイルは4つずつ出力します。通常のCSV出力は、約25秒で最速でした。dc2.largeの2ノードなのでファイルは4つずつ出力します。
cmdb=> UNLOAD ('select * FROM lineorder_test;') cmdb-> TO 's3://cm-user/lineorder_test/lineorder_parquet/' cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role' cmdb-> FORMAT AS PARQUET cmdb-> ; INFO: UNLOAD completed, 9106395 record(s) unloaded successfully. UNLOAD Time: 20348.743 ms (00:20.349) --- % aws s3 ls s3://cm-user/lineorder_test/lineorder_parquet/ --recursive 2020-03-29 21:14:35 78859742 lineorder_test/lineorder_parquet/0000_part_00.parquet 2020-03-29 21:14:35 79212678 lineorder_test/lineorder_parquet/0001_part_00.parquet 2020-03-29 21:14:35 78876263 lineorder_test/lineorder_parquet/0002_part_00.parquet 2020-03-29 21:14:35 79049744 lineorder_test/lineorder_parquet/0003_part_00.parquet
上記にPARTITION BYの指定してカラム名ありパーティション出力すると約2分14秒で365パーティション作成されました。パーティションごとに4つのファイルが作成されます。
cmdb=> UNLOAD ('select * FROM lineorder_test;') cmdb-> TO 's3://cm-user/lineorder_test/lineorder_partitioned_parquet/' cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role' cmdb-> FORMAT AS PARQUET cmdb-> PARTITION BY (lo_orderdate); INFO: UNLOAD completed, 9106395 record(s) unloaded successfully. UNLOAD Time: 134429.230 ms (02:14.429) --- % aws s3 ls s3://cm-user/lineorder_test/lineorder_partitioned_parquet/ --recursive 2020-03-29 21:15:30 255578 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19920102/0000_part_00.parquet 2020-03-29 21:15:30 264993 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19920102/0001_part_00.parquet 2020-03-29 21:15:30 260896 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19920102/0002_part_00.parquet 2020-03-29 21:15:30 260852 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19920102/0003_part_00.parquet : : 2020-03-29 21:17:37 265705 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19921231/0002_part_00.parquet 2020-03-29 21:17:38 261312 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19921231/0003_part_00.parquet
生成されたParquetファイルの中をS3Selectで確認します。PARTITION BYに指定したlo_orderdateカラムは含まれないParquetファイルが作成されます。
[ { "lo_orderkey": 597459780, "lo_linenumber": 3, "lo_custkey": 2221423, "lo_partkey": 666256, "lo_suppkey": 820069, "lo_orderpriority": "3-MEDIUM", "lo_shippriority": "0", "lo_quantity": 43, "lo_extendedprice": 5255546, "lo_ordertotalprice": 15438046, "lo_discount": 9, "lo_revenue": 4782546, "lo_supplycost": 73333, "lo_tax": 8, "lo_commitdate": 19920208, "lo_shipmode": "FOB" }, { "lo_orderkey": 540891970, : : :
UNLOAD(INCLUDEオプション付きPARTITION BYによるParquet出力)
INCLUDEオプション付きINCLUDEオプション付きPARTITION BYの指定してカラム名ありパーティション出力すると約2分9秒で365パーティション作成されました。パーティションごとに4つのファイルが作成されます。
cmdb=> UNLOAD ('select * FROM lineorder_test;') cmdb-> TO 's3://cm-user/lineorder_test/lineorder_partitioned_inc_parquet/' cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role' cmdb-> FORMAT AS PARQUET cmdb-> PARTITION BY (lo_orderdate) INCLUDE; INFO: UNLOAD completed, 9106395 record(s) unloaded successfully. UNLOAD Time: 129937.515 ms (02:09.938) --- % aws s3 ls s3://cm-user/lineorder_test/lineorder_partitioned_inc_parquet/ --recursive 2020-03-29 21:22:06 255819 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19920102/0000_part_00.parquet 2020-03-29 21:22:06 265234 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19920102/0001_part_00.parquet 2020-03-29 21:22:06 261137 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19920102/0002_part_00.parquet 2020-03-29 21:22:06 261093 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19920102/0003_part_00.parquet : : 2020-03-29 21:24:10 265946 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19921231/0002_part_00.parquet 2020-03-29 21:24:13 261553 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19921231/0003_part_00.parquet
生成されたParquetファイルの中をS3Selectで確認します。PARTITION BYに指定したlo_orderdateカラムがそのまま含まれたParquetファイルが作成されます。
[ { "lo_orderkey": 597459780, "lo_linenumber": 3, "lo_custkey": 2221423, "lo_partkey": 666256, "lo_suppkey": 820069, "lo_orderdate": 19920102, "lo_orderpriority": "3-MEDIUM", "lo_shippriority": "0", "lo_quantity": 43, "lo_extendedprice": 5255546, "lo_ordertotalprice": 15438046, "lo_discount": 9, "lo_revenue": 4782546, "lo_supplycost": 73333, "lo_tax": 8, "lo_commitdate": 19920208, "lo_shipmode": "FOB" }, { "lo_orderkey": 540891970, : : :
出力ファイルからGlueクローラで外部テーブルを自動作成
「UNLOAD(Parquet出力)」で作成したファイルを、Glueクローラを用いて外部テーブルを自動作成します。出力したファイルを外部テーブルとして登録することで、Redshift Spectrum、Athena、Glue、EMRなどさまざまなAWSのビッグデータソリューションとの連携が可能になります。手順は画面に従いクローラの作成・実行するだけです。
クローラの名前
クローラのSourceTypeの指定
データストアの追加
別のデータストアの追加
IAMロールの選択
クローラのスケジュール設定
クローラの出力設定
クローラの設定確認
クローラの実行
Athenaからクエリを実行
[Tips] パーティションキーのカスタマイズ
上記の例では、lo_orderdateカラムの年月日の値でパーティション分割しましたが、必ずしも既存のカラムの値に限りません。例えば、lo_orderdateカラムの年月でパーティション分割したい場合は、以下のように年月のlo_ordermonthカラムを導出して、lo_ordermonthでパーティション分割することが可能です。
cmdb=> UNLOAD ('select *, (lo_orderdate / 100) AS lo_ordermonth FROM lineorder_test;') cmdb-> TO 's3://cm-user/lineorder_test/lineorder_partitioned_parquet_ordermonth/' cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role' cmdb-> FORMAT AS PARQUET cmdb-> PARTITION BY (lo_ordermonth); INFO: UNLOAD completed, 9106395 record(s) unloaded successfully. UNLOAD Time: 20248.052 ms (00:20.248) --- % aws s3 ls s3://cm-user/lineorder_test/lineorder_partitioned_parquet_ordermonth/ --recursive 2020-03-30 01:04:45 7557230 lineorder_test/lineorder_partitioned_parquet_ordermonth/lo_ordermonth=199201/0000_part_00.parquet 2020-03-30 01:04:45 7577753 lineorder_test/lineorder_partitioned_parquet_ordermonth/lo_ordermonth=199201/0001_part_00.parquet : : 2020-03-30 01:05:01 7815467 lineorder_test/lineorder_partitioned_parquet_ordermonth/lo_ordermonth=199212/0002_part_00.parquet 2020-03-30 01:05:02 7830526 lineorder_test/lineorder_partitioned_parquet_ordermonth/lo_ordermonth=199212/0003_part_00.parquet
[Tips] Parquetファイルサイズの最適化
最も処理効率の良いParquetファイルサイズは256MB〜1GBですが、そのままUNLOADするとRedshiftのスライスサイズでファイルが小さな分割されてしまい、AthenaやGlue(Spark)で処理する際にパフォーマンスに悪影響が生じる可能性があります。そこで、PARALLEL OFF
指定することで、パーティション内のファイルを1つのファイルにまとめることが可能です。
以下の例では、ファイルをまとめられることを確認しています。
cmdb=> UNLOAD ('select *, (lo_orderdate / 100) AS lo_ordermonth FROM lineorder_test;') cmdb-> TO 's3://cm-user/lineorder_test/lineorder_partitioned_parquet_opt/' cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role' cmdb-> FORMAT AS PARQUET cmdb-> PARALLEL OFF cmdb-> PARTITION BY (lo_ordermonth); INFO: UNLOAD completed, 9106395 record(s) unloaded successfully. UNLOAD Time: 43878.377 ms (00:43.878) --- % aws s3 ls s3://cm-user/lineorder_test/lineorder_partitioned_parquet_opt/ --recursive 2020-03-30 01:47:20 24458111 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199201/000.parquet 2020-03-30 01:47:24 23843710 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199202/000.parquet 2020-03-30 01:47:27 25346415 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199203/000.parquet 2020-03-30 01:47:31 24630728 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199204/000.parquet 2020-03-30 01:47:34 25378878 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199205/000.parquet 2020-03-30 01:47:38 24564990 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199206/000.parquet 2020-03-30 01:47:41 25310834 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199207/000.parquet 2020-03-30 01:47:45 25375873 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199208/000.parquet 2020-03-30 01:47:49 24491352 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199209/000.parquet 2020-03-30 01:47:52 25345431 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199210/000.parquet 2020-03-30 01:47:56 24632677 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199211/000.parquet 2020-03-30 01:47:59 25328213 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199212/000.parquet
最後に
今回の新機能でRedshiftはパーティションした形式でデータを直接、データレイクにアンロードできるようになりました。Redshiftはスキーマオンライトなので、データのロードが必要になりますがクエリ可能なデータであることが保証されます。一方、データレイクはスキーマオンリードなので本来S3のデータがクエリ可能であるかが保証できませんが、Redshiftからアンロードされたファイルはクエリ可能なデータであることが保証できます。この連携を用いることでデータレイクでもスキーマオンライトと同様の信頼性が得られるようになります。
GlueやEMRで大量のデータファイルを読み込み、パーティション形式で出力するとパーティションサイズの大きさや分割されたファイルを纏める処理など、スケールに応じて、ワーカー数やメモリサイズの調整、コードを書き換える煩雑な作業がありました。今後は命令的コードよりも宣言的クエリ言語で記述できるので、Redshiftにデータをロードして、Parquetでアンロードするだけで初期のデータ移行が済ませてしまうケースが増えるのではないかと思います。私も積極的に活用していきたいと思います。